package spark.accumulator;

import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import spark.util.SparkUtil;

import java.util.Arrays;

/**
 * 描述:
 * accumulator demo
 *
 * @author jiantao7
 * @create 2018-05-21 9:08
 */
public class AccumulatorDemoJava {
    public static void main(String[] args) {
        SparkUtil sparkUtil = new SparkUtil("accumulator", true);
        SparkSession spark = sparkUtil.getSparkSession();
        JavaSparkContext jsc = sparkUtil.getJavaSparkContext();
//        accumulator1x(jsc);
//        accumulator2x(jsc);
//        selfAccumulator1x(jsc);
        selfAccumulator2x(jsc);

    }

    /**
     * 自定义accumulator 2.x版本
     * @param jsc
     */
    private static void selfAccumulator2x(JavaSparkContext jsc) {
        MyAccumulator2x myAccumulator2x = new MyAccumulator2x();
        jsc.sc().register(myAccumulator2x);

        jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5)).foreach((VoidFunction<Integer>) myAccumulator2x::add);
        System.out.println(myAccumulator2x.value());
    }

    /**
     * 自定义accumulator 1.x版本
     * @param jsc
     */
    private static void selfAccumulator1x(JavaSparkContext jsc) {
        Accumulator<Integer> accumulator = jsc.accumulator(1, new MyAccumulatorParam1x());

        jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5)).foreach((VoidFunction<Integer>) accumulator::add);
        System.out.println(accumulator.value());
    }

    private static void accumulator2x(JavaSparkContext jsc) {
        LongAccumulator longAccumulator = jsc.sc().longAccumulator("longAccumulator");
        jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5)).foreach((VoidFunction<Integer>) longAccumulator::add);
        System.out.println(longAccumulator.value());
    }

    /**
     * spark1.x 已过时
     *
     * @param sc
     */
    private static void accumulator1x(JavaSparkContext sc) {

        Accumulator<Integer> accumulator = sc.accumulator(0, "hei");
//        Accumulator<Integer> accumulator = sc.accumulator(0, "my1.x accumutor", new MyAccumulatorParam());
        sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)).foreach((VoidFunction<Integer>) accumulator::add);

        System.out.println(accumulator.value());

    }
}